Page MenuHomePhabricator

D5037.id16236.diff
No OneTemporary

D5037.id16236.diff

diff --git a/services/backup/blob_client/src/get_client.rs b/services/backup/blob_client/src/get_client.rs
--- a/services/backup/blob_client/src/get_client.rs
+++ b/services/backup/blob_client/src/get_client.rs
@@ -6,15 +6,15 @@
use proto::GetRequest;
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
-use crate::tools::{check_error, report_error};
+use crate::tools::{check_error, report_error, c_char_pointer_to_string, string_to_c_char_pointer};
use lazy_static::lazy_static;
use libc;
use libc::c_char;
-use std::ffi::CStr;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
+use std::collections::HashMap;
struct ReadClient {
rx: mpsc::Receiver<Vec<u8>>,
@@ -22,20 +22,20 @@
}
lazy_static! {
- static ref CLIENT: Arc<Mutex<Option<ReadClient>>> =
- Arc::new(Mutex::new(None));
+ // todo: we should consider limiting the clients size,
+ // if every client is able to allocate up to 4MB data at a time
+ static ref CLIENTS: Arc<Mutex<HashMap<String, ReadClient>>> =
+ Arc::new(Mutex::new(HashMap::new()));
static ref RUNTIME: Runtime = Runtime::new().unwrap();
static ref ERROR_MESSAGES: Arc<Mutex<Vec<String>>> =
Arc::new(Mutex::new(Vec::new()));
}
-fn is_initialized() -> bool {
- if let Ok(client) = CLIENT.lock() {
- if client.is_some() {
- return true;
- }
+fn is_initialized(holder: &String) -> bool {
+ if let Ok(clients) = CLIENTS.lock() {
+ return clients.contains_key(holder);
} else {
- report_error(&ERROR_MESSAGES, "couldn't access client", Some("get"));
+ report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
}
false
}
@@ -43,15 +43,12 @@
pub fn get_client_initialize_cxx(
holder_char: *const c_char,
) -> Result<(), String> {
- let initialized = is_initialized();
- if initialized {
- get_client_terminate_cxx()?;
+ let holder = c_char_pointer_to_string(holder_char);
+ if is_initialized(&holder) {
+ get_client_terminate_cxx(string_to_c_char_pointer(&holder))?;
}
- assert!(!is_initialized(), "client cannot be initialized twice");
-
- let holder_cstr: &CStr = unsafe { CStr::from_ptr(holder_char) };
- let holder: String = holder_cstr.to_str().unwrap().to_owned();
+ assert!(!is_initialized(&holder), "client cannot be initialized twice");
// grpc
if let Ok(mut grpc_client) =
@@ -62,8 +59,9 @@
mpsc::Sender<Vec<u8>>,
mpsc::Receiver<Vec<u8>>,
) = mpsc::channel(MPSC_CHANNEL_BUFFER_CAPACITY);
+ let cloned_holder = holder.clone();
let rx_handle = RUNTIME.spawn(async move {
- if let Ok(response) = grpc_client.get(GetRequest { holder }).await {
+ if let Ok(response) = grpc_client.get(GetRequest { holder: cloned_holder }).await {
let mut inner_response = response.into_inner();
let mut response_present = true;
while response_present {
@@ -101,11 +99,12 @@
}
});
- if let Ok(mut client) = CLIENT.lock() {
- *client = Some(ReadClient {
+ if let Ok(mut clients) = CLIENTS.lock() {
+ let client = ReadClient {
rx_handle,
rx: response_thread_rx,
- });
+ };
+ (*clients).insert(holder, client);
return Ok(());
}
return Err("could not access client".to_string());
@@ -113,18 +112,19 @@
Err("could not successfully connect to the blob server".to_string())
}
-pub fn get_client_blocking_read_cxx() -> Result<Vec<u8>, String> {
+pub fn get_client_blocking_read_cxx(holder_char: *const c_char) -> Result<Vec<u8>, String> {
+ let holder = c_char_pointer_to_string(holder_char);
let mut response: Option<Vec<u8>> = None;
check_error(&ERROR_MESSAGES)?;
RUNTIME.block_on(async {
- if let Ok(mut maybe_client) = CLIENT.lock() {
- if let Some(mut client) = (*maybe_client).take() {
+ if let Ok(mut clients) = CLIENTS.lock() {
+ let maybe_client = clients.get_mut(&holder);
+ if let Some(client) = maybe_client {
if let Some(data) = client.rx.recv().await {
response = Some(data);
} else {
response = Some(vec![]);
}
- *maybe_client = Some(client);
} else {
report_error(&ERROR_MESSAGES, "no client present", Some("get"));
}
@@ -137,13 +137,15 @@
Ok(response)
}
-pub fn get_client_terminate_cxx() -> Result<(), String> {
+pub fn get_client_terminate_cxx(holder_char: *const c_char) -> Result<(), String> {
+ let holder = c_char_pointer_to_string(holder_char);
check_error(&ERROR_MESSAGES)?;
- if !is_initialized() {
+ if !is_initialized(&holder) {
return Ok(());
}
- if let Ok(mut maybe_client) = CLIENT.lock() {
- if let Some(client) = (*maybe_client).take() {
+ if let Ok(mut clients) = CLIENTS.lock() {
+ let maybe_client = clients.remove(&holder);
+ if let Some(client) = maybe_client {
RUNTIME.block_on(async {
if client.rx_handle.await.is_err() {
report_error(
@@ -161,7 +163,7 @@
}
assert!(
- !is_initialized(),
+ !is_initialized(&holder),
"client transmitter handler released properly"
);
check_error(&ERROR_MESSAGES)?;
diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs
--- a/services/backup/blob_client/src/lib.rs
+++ b/services/backup/blob_client/src/lib.rs
@@ -34,7 +34,7 @@
unsafe fn get_client_initialize_cxx(
holder_char: *const c_char,
) -> Result<()>;
- fn get_client_blocking_read_cxx() -> Result<Vec<u8>>;
- fn get_client_terminate_cxx() -> Result<()>;
+ unsafe fn get_client_blocking_read_cxx(holder_char: *const c_char) -> Result<Vec<u8>>;
+ unsafe fn get_client_terminate_cxx(holder_char: *const c_char) -> Result<()>;
}
}

File Metadata

Mime Type
text/plain
Expires
Fri, Nov 29, 1:14 PM (19 h, 16 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
2597108
Default Alt Text
D5037.id16236.diff (5 KB)

Event Timeline