Page MenuHomePhabricator

D5092.diff
No OneTemporary

D5092.diff

diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs
--- a/services/backup/blob_client/src/get_client.rs
+++ b/services/backup/blob_client/src/get_client.rs
@@ -7,8 +7,7 @@
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
use crate::tools::{
- c_char_pointer_to_string, c_char_pointer_to_string_new, check_error,
- report_error, string_to_c_char_pointer, string_to_c_char_pointer_new,
+ c_char_pointer_to_string_new, string_to_c_char_pointer_new,
};
use anyhow::bail;
use crate::RUNTIME;
@@ -22,7 +21,7 @@
struct ReadClient {
rx: mpsc::Receiver<Vec<u8>>,
- rx_handle: JoinHandle<()>,
+ rx_handle: JoinHandle<anyhow::Result<(), anyhow::Error>>,
}
lazy_static! {
@@ -34,16 +33,7 @@
Mutex::new(Vec::new());
}
-fn is_initialized(holder: &str) -> bool {
- if let Ok(clients) = CLIENTS.lock() {
- return clients.contains_key(holder);
- } else {
- report_error(&ERROR_MESSAGES, "couldn't access client", Some("get"));
- }
- false
-}
-
-fn is_initialized_new(holder: &String) -> anyhow::Result<bool, anyhow::Error> {
+fn is_initialized(holder: &str) -> anyhow::Result<bool, anyhow::Error> {
if let Ok(clients) = CLIENTS.lock() {
return Ok(clients.contains_key(holder));
}
@@ -52,13 +42,11 @@
pub fn get_client_initialize_cxx(
holder_char: *const c_char,
-) -> Result<(), String> {
- let holder = c_char_pointer_to_string(holder_char)?;
-
- assert!(
- !is_initialized(&holder),
- "client cannot be initialized twice"
- );
+) -> anyhow::Result<(), anyhow::Error> {
+ let holder = c_char_pointer_to_string_new(holder_char)?;
+ if is_initialized(&holder)? {
+ get_client_terminate_cxx(string_to_c_char_pointer_new(&holder)?)?;
+ }
// grpc
if let Ok(mut grpc_client) =
@@ -69,48 +57,29 @@
mpsc::channel::<Vec<u8>>(MPSC_CHANNEL_BUFFER_CAPACITY);
let cloned_holder = holder.clone();
let rx_handle = RUNTIME.spawn(async move {
- if let Ok(response) = grpc_client
+ let response = grpc_client
.get(GetRequest {
holder: cloned_holder,
})
- .await
- {
- let mut inner_response = response.into_inner();
- loop {
- match inner_response.message().await {
- Ok(maybe_data) => {
- let mut result = false;
- if let Some(data) = maybe_data {
- let data: Vec<u8> = data.data_chunk;
- result = match response_thread_tx.send(data).await {
- Ok(_) => true,
- Err(err) => {
- report_error(
- &ERROR_MESSAGES,
- &err.to_string(),
- Some("get"),
- );
- false
- }
- }
- }
- if !result {
- break;
- }
- }
+ .await?;
+ let mut inner_response = response.into_inner();
+ loop {
+ let maybe_data = inner_response.message().await?;
+ let mut result = false;
+ if let Some(data) = maybe_data {
+ let data: Vec<u8> = data.data_chunk;
+ result = match response_thread_tx.send(data).await {
+ Ok(_) => true,
Err(err) => {
- report_error(&ERROR_MESSAGES, &err.to_string(), Some("get"));
- break;
+ bail!(err);
}
- };
+ }
+ }
+ if !result {
+ break;
}
- } else {
- report_error(
- &ERROR_MESSAGES,
- "couldn't perform grpc get operation",
- Some("get"),
- );
}
+ Ok(())
});
if let Ok(mut clients) = CLIENTS.lock() {
@@ -121,9 +90,9 @@
(*clients).insert(holder, client);
return Ok(());
}
- return Err("could not access client".to_string());
+ bail!("could not access client");
}
- Err("could not successfully connect to the blob server".to_string())
+ bail!("could not successfully connect to the blob server")
}
pub fn get_client_blocking_read_cxx(
@@ -148,7 +117,7 @@
holder_char: *const c_char,
) -> anyhow::Result<(), anyhow::Error> {
let holder = c_char_pointer_to_string_new(holder_char)?;
- if !is_initialized_new(&holder)? {
+ if !is_initialized(&holder)? {
return Ok(());
}
if let Ok(mut clients) = CLIENTS.lock() {
@@ -169,7 +138,7 @@
bail!("couldn't access client");
}
- if is_initialized_new(&holder)? {
+ if is_initialized(&holder)? {
bail!("client transmitter handler released properly");
}
Ok(())

File Metadata

Mime Type
text/plain
Expires
Tue, Nov 26, 6:20 AM (17 h, 28 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2583230
Default Alt Text
D5092.diff (4 KB)

Event Timeline