Page MenuHomePhorge

D5035.1767390051.diff
No OneTemporary

Size
8 KB
Referenced Files
None
Subscribers
None

D5035.1767390051.diff

diff --git a/services/backup/blob_client/src/lib.rs b/services/backup/blob_client/src/lib.rs
--- a/services/backup/blob_client/src/lib.rs
+++ b/services/backup/blob_client/src/lib.rs
@@ -15,13 +15,20 @@
#[cxx::bridge]
mod ffi {
extern "Rust" {
- fn put_client_initialize_cxx() -> Result<()>;
+ unsafe fn put_client_initialize_cxx(
+ holder_char: *const c_char,
+ ) -> Result<()>;
unsafe fn put_client_write_cxx(
+ holder_char: *const c_char,
field_index: usize,
data: *const c_char,
) -> Result<()>;
- fn put_client_blocking_read_cxx() -> Result<String>;
- fn put_client_terminate_cxx() -> Result<()>;
+ unsafe fn put_client_blocking_read_cxx(
+ holder_char: *const c_char,
+ ) -> Result<String>;
+ unsafe fn put_client_terminate_cxx(
+ holder_char: *const c_char,
+ ) -> Result<()>;
unsafe fn get_client_initialize_cxx(
holder_char: *const c_char,
) -> Result<()>;
diff --git a/services/backup/blob_client/src/put_client.rs b/services/backup/blob_client/src/put_client.rs
--- a/services/backup/blob_client/src/put_client.rs
+++ b/services/backup/blob_client/src/put_client.rs
@@ -8,10 +8,13 @@
use proto::PutRequest;
use crate::constants::{BLOB_ADDRESS, MPSC_CHANNEL_BUFFER_CAPACITY};
-use crate::tools::{report_error, check_error};
+use crate::tools::{
+ c_char_pointer_to_string, check_error, report_error, string_to_c_char_pointer,
+};
use lazy_static::lazy_static;
use libc;
use libc::c_char;
+use std::collections::HashMap;
use std::ffi::CStr;
use std::sync::Mutex;
use tokio::runtime::Runtime;
@@ -32,17 +35,18 @@
}
lazy_static! {
- static ref CLIENT: Mutex<Option<BidiClient>> =
- Mutex::new(None);
- // todo we should probably create separate clients for different IDs
+ // todo: we should consider limiting the clients size,
+ // if every client is able to allocate up to 4MB data at a time
+ static ref CLIENTS: Mutex<HashMap<String, BidiClient>> =
+ Mutex::new(HashMap::new());
static ref RUNTIME: Runtime = Runtime::new().unwrap();
static ref ERROR_MESSAGES: Mutex<Vec<String>> =
Mutex::new(Vec::new());
}
-fn is_initialized() -> bool {
- match CLIENT.lock() {
- Ok(client) => client.is_some(),
+fn is_initialized(holder: &str) -> bool {
+ match CLIENTS.lock() {
+ Ok(clients) => clients.contains_key(holder),
_ => {
report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
false
@@ -50,11 +54,17 @@
}
}
-pub fn put_client_initialize_cxx() -> Result<(), String> {
- if is_initialized() {
- put_client_terminate_cxx()?;
+pub fn put_client_initialize_cxx(
+ holder_char: *const c_char,
+) -> Result<(), String> {
+ let holder = c_char_pointer_to_string(holder_char)?;
+ if is_initialized(&holder) {
+ put_client_terminate_cxx(string_to_c_char_pointer(&holder)?)?;
}
- assert!(!is_initialized(), "client cannot be initialized twice");
+ assert!(
+ !is_initialized(&holder),
+ "client cannot be initialized twice"
+ );
// grpc
if let Ok(mut grpc_client) =
RUNTIME.block_on(async { BlobServiceClient::connect(BLOB_ADDRESS).await })
@@ -123,10 +133,10 @@
Ok(maybe_response_message) => {
let mut result = false;
if let Some(response_message) = maybe_response_message {
- // warning: this will produce an error if there's more unread responses than
- // MPSC_CHANNEL_BUFFER_CAPACITY
- // you should then use put_client_blocking_read_cxx in order to dequeue
- // the responses in c++ and make room for more
+ // warning: this will produce an error if there's more unread
+ // responses than MPSC_CHANNEL_BUFFER_CAPACITY
+ // you should then use put_client_blocking_read_cxx in order
+ // to dequeue the responses in c++ and make room for more
if let Ok(_) = response_thread_tx
.try_send((response_message.data_exists as i32).to_string())
{
@@ -156,24 +166,35 @@
};
});
- if let Ok(mut client) = CLIENT.lock() {
- *client = Some(BidiClient {
+ if is_initialized(&holder) {
+ return Err(format!(
+ "client initialization overlapped for holder {}",
+ holder
+ ));
+ }
+ if let Ok(mut clients) = CLIENTS.lock() {
+ let client = BidiClient {
tx: request_thread_tx,
rx: response_thread_rx,
rx_handle,
- });
+ };
+ (*clients).insert(holder, client);
return Ok(());
}
- return Err("could not access client".to_string());
+ return Err(format!("could not access client for holder {}", holder));
}
Err("could not successfully connect to the blob server".to_string())
}
-pub fn put_client_blocking_read_cxx() -> Result<String, String> {
+pub fn put_client_blocking_read_cxx(
+ holder_char: *const c_char,
+) -> Result<String, String> {
+ let holder = c_char_pointer_to_string(holder_char)?;
check_error(&ERROR_MESSAGES)?;
let response: Option<String> = RUNTIME.block_on(async {
- if let Ok(mut maybe_client) = CLIENT.lock() {
- if let Some(mut client) = (*maybe_client).take() {
+ if let Ok(mut clients) = CLIENTS.lock() {
+ let maybe_client = clients.get_mut(&holder);
+ if let Some(client) = maybe_client {
if let Some(data) = client.rx.recv().await {
return Some(data);
} else {
@@ -183,9 +204,8 @@
Some("put"),
);
}
- *maybe_client = Some(client);
} else {
- report_error(&ERROR_MESSAGES, "no client detected", Some("put"));
+ report_error(&ERROR_MESSAGES, "no client detected in blocking read", Some("put"));
}
} else {
report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
@@ -203,16 +223,19 @@
* 3 - data chunk (bytes)
*/
pub fn put_client_write_cxx(
+ holder_char: *const c_char,
field_index: usize,
data: *const c_char,
) -> Result<(), String> {
+ let holder = c_char_pointer_to_string(holder_char)?;
check_error(&ERROR_MESSAGES)?;
let data_c_str: &CStr = unsafe { CStr::from_ptr(data) };
let data_bytes: Vec<u8> = data_c_str.to_bytes().to_vec();
RUNTIME.block_on(async {
- if let Ok(mut maybe_client) = CLIENT.lock() {
- if let Some(client) = (*maybe_client).take() {
+ if let Ok(clients) = CLIENTS.lock() {
+ let maybe_client = clients.get(&holder);
+ if let Some(client) = maybe_client {
match client
.tx
.send(PutRequestData {
@@ -228,9 +251,8 @@
Some("put"),
),
}
- *maybe_client = Some(client);
} else {
- report_error(&ERROR_MESSAGES, "no client detected", Some("put"));
+ report_error(&ERROR_MESSAGES, "no client detected in write", Some("put"));
}
} else {
report_error(&ERROR_MESSAGES, "couldn't access client", Some("put"));
@@ -240,15 +262,19 @@
Ok(())
}
-pub fn put_client_terminate_cxx() -> Result<(), String> {
+pub fn put_client_terminate_cxx(
+ holder_char: *const c_char,
+) -> Result<(), String> {
+ let holder = c_char_pointer_to_string(holder_char)?;
check_error(&ERROR_MESSAGES)?;
- if !is_initialized() {
+ if !is_initialized(&holder) {
check_error(&ERROR_MESSAGES)?;
return Ok(());
}
- if let Ok(mut maybe_client) = CLIENT.lock() {
- if let Some(client) = (*maybe_client).take() {
+ if let Ok(mut clients) = CLIENTS.lock() {
+ let maybe_client = clients.remove(&holder);
+ if let Some(client) = maybe_client {
drop(client.tx);
RUNTIME.block_on(async {
if client.rx_handle.await.is_err() {
@@ -260,14 +286,14 @@
}
});
} else {
- return Err("no client detected".to_string());
+ return Err("no client detected in terminate".to_string());
}
} else {
return Err("couldn't access client".to_string());
}
assert!(
- !is_initialized(),
+ !is_initialized(&holder),
"client transmitter handler released properly"
);
check_error(&ERROR_MESSAGES)?;

File Metadata

Mime Type
text/plain
Expires
Fri, Jan 2, 9:40 PM (18 h, 40 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5883339
Default Alt Text
D5035.1767390051.diff (8 KB)

Event Timeline